package com.strongbj.iot.mq.producer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.strongbj.core.util.ContextUtils;
import com.strongbj.core.util.PropertiesUtil;

@Component("topicSender")
public class TopicSender {
	private static Logger logger = LogManager.getLogger(TopicSender.class.getName());
//	@Autowired
//	@Qualifier("jmsTopicTemplate")
	private JmsTemplate jmsTemplate;
	private KafkaTemplate<String, Object> kafkaTemplate;
	private String applicationCommunicationMode;

	@SuppressWarnings("unchecked")
	public TopicSender() {
		applicationCommunicationMode = PropertiesUtil.getString("application.communication.mode");
		if (applicationCommunicationMode != null && applicationCommunicationMode.equals("kafka")) {
			kafkaTemplate = (KafkaTemplate<String, Object>) ContextUtils.getBean(KafkaTemplate.class);
		} else {
			applicationCommunicationMode = "mq";
			jmsTemplate = (JmsTemplate) ContextUtils.getBean("jmsTopicTemplate");
		}

	}

	/**
	 * 发送一条消息到指定的队列（目标）
	 * 
	 * @param queueName 队列名称
	 * @param message   消息内容
	 */
	public void send(String topicName, final String message) {
		if (applicationCommunicationMode.equals("kafka")) {
			kafkaTemplate.send(topicName, message);
			logger.info("发送kafka消息:topic=={},消息=={}",topicName,message);
		} else {
			jmsTemplate.send(topicName, new MessageCreator() {
				@Override
				public Message createMessage(Session session) throws JMSException {
//					System.out.println(Thread.currentThread().getId()+"===============");
					return session.createTextMessage(message);
				}
			});
		}
	}

}
